---
title: "Middleware"
icon: "repeat"
---

## Overview

Middleware in the BeeAI Framework is code that runs "in the middle" of an execution lifecycle—intercepting the flow between when a component (like an Agent, Tool, or Model) starts and when it finishes.

As these components execute, they emit events at key moments, such as starting a task, calling a tool, or completing a response . Middleware hooks into these events to inject behaviors like logging, filtering, or safety checks—all without modifying the component's core logic. This modular approach allows you to apply consistent policies across your entire system.

You can use built-in tools like `GlobalTrajectoryMiddleware` for immediate debugging, or write custom middleware to handle complex needs like blocking unsafe content, enforcing rate limits, or managing authentication.

<Note>
Note on Terminology: In this framework, **Middleware** refers to the classic software design pattern (pipeline interceptors) that runs between execution steps. This is distinct from the industry term "Agentic Middleware," which typically refers to entire orchestration platforms.
</Note>


## Built-in Middleware

The following section showcases built-in middleware that you can start using right away.

### Global Trajectory

The fastest way to understand your agent's execution flow is by using the `GlobalTrajectoryMiddleware`. It captures all events, including deeply-nested ones, and prints them to the console, using indentation to visualize the call stack .

**Example**

```py Python [expandable]
from beeai_framework.middleware.trajectory import GlobalTrajectoryMiddleware

agent = RequirementAgent(
    llm="ollama:granite3.3",
    tools=[OpenMeteoTool()]
)
await agent.run("What's the current weather in Miami?")
            .middleware(GlobalTrajectoryMiddleware())
```

You can customize the output by passing parameters to the constructor:

| Parameter          | Description                                                                                              |
|--------------------|----------------------------------------------------------------------------------------------------------|
| `target`           | Specify a file or stream to which to write the trajectory (pass `False` to disable).                     |
| `included`         | List of classes to include in the trajectory.                                                            |
| `excluded`         | List of classes to exclude from the trajectory.                                                          |
| `pretty`           | Use pretty formatting for the trajectory.                                                                |
| `prefix_by_type`   | Customize how instances of individual classes should be printed.                                         |
| `exclude_none`     | Exclude `None` values from the printing.                                                                 |
| `enabled`          | Enable/Disable the logging.                                                                              |
| `match_nested`     | Whether to observe trajectories of nested run contexts.                                                  |
| `emitter_priority` | Setting higher priority may result in capturing events without any modifications from other middlewares. |

**Example**

```py Python [expandable]
from beeai_framework.middleware.trajectory import GlobalTrajectoryMiddleware
from beeai_framework.tools.tool import Tool
from beeai_framework.backend.chat import ChatModel
from beeai_framework.tools.weather.openmeteo import OpenMeteoTool
from beeai_framework.agents.base import BaseAgent

# Log only tool calls
GlobalTrajectoryMiddleware(target=[Tool]) 

# Log only tool calls except OpenMeteoTool
GlobalTrajectoryMiddleware(target=[Tool], excluded=[OpenMeteoTool]) 

# Log only ChatModel events
GlobalTrajectoryMiddleware(target=[ChatModel]) 

# Use a Bee emoji for agents
GlobalTrajectoryMiddleware(prefix_by_type={BaseAgent: "🐝 "}) 
```

<Tip>You can listen to events emitted throughout the execution to build your custom trajectory.</Tip>


### Tool Call Streaming

This middleware handles streaming tool calls in a `ChatModel`. It observes stream updates from the [Chat Model](/modules/backend#chat-model) and parses tool calls on demand so that they can be consumed immediately. 

It works even without streaming enabled, in which case it emits the `update` event at the end of the execution..

**Example**

{/* <!-- embedme python/examples/middleware/llm_streaming.py --> */}
```py Python [expandable]
import asyncio

from beeai_framework.backend import ChatModel, UserMessage
from beeai_framework.emitter import EventMeta
from beeai_framework.middleware.stream_tool_call import StreamToolCallMiddleware, StreamToolCallMiddlewareUpdateEvent
from beeai_framework.tools.weather import OpenMeteoTool


async def main() -> None:
    llm = ChatModel.from_name("ollama:granite4:micro")
    weather_tool = OpenMeteoTool()
    middleware = StreamToolCallMiddleware(
        weather_tool,
        key="location_name",  # name is taken from the OpenMeteoToolInput schema
        match_nested=False,  # we are applying the middleware to the model directly
        force_streaming=True,  # we want to let middleware enable streaming on the model
    )

    @middleware.emitter.on("update")
    def log_thoughts(event: StreamToolCallMiddlewareUpdateEvent, meta: EventMeta) -> None:
        print(
            "Received update", event.delta, event.output_structured
        )  # event.delta contains an update of the 'location_name' field

    response = await llm.run([UserMessage("What's the current weather in New York?")], tools=[weather_tool]).middleware(
        middleware
    )
    print(response.get_tool_calls()[0].args)


if __name__ == "__main__":
    asyncio.run(main())

```

The following parameters can be passed to the constructor:

| Parameter         | Description                                                                 |
|-------------------|------------------------------------------------------------------------------|
| `target`          | The tool that we are waiting for to be called.                              |
| `key`             | Refers to the name of the attribute in the tool's schema that we want to stream. |
| `match_nested`    | Whether the middleware should be applied only to the top level.              |
| `force_streaming` | Sets the stream flag on the `ChatModel`.                                     |


## Core Primatives

The BeeAI Framework middleware is built on an underlying system of primitives, which are described in this section. Understanding these primitives is helpful for building complex middleware.

### Events

An event refers to an action initiated by a component. It carries the details of what just happened within the system.

Every event has three key properties:
- Name: A string identifier (e.g., `start`, `success`, `error`, or custom names like `fetch_data`).
- Data payload: The  content of the event, typically astructured as a Pydantic model.
- Metadata: Information about the context where the event was fired.

You process these events using callbacks that follow this structure:

```py Python [expandable]
from beeai_framework.backend.chat import ChatModelStartEvent
from beeai_framework.emitter import EventMeta

async def handler(data: ChatModelStartEvent, meta: EventMeta) -> None:
  print(f"Received event {meta.name} ({meta.path}) at {meta.created_at}")
  print("-> created by", type(meta.creator))
  print("-> data payload", meta.model_dump())
  print("-> context", meta.context)
  print("-> trace", meta.trace)
```

### Emitter

The **Emitter** is the core component that lets you send and watch for events. While it is typically attached to a specific class, you can also use it on its own.
An emitter instance is typically the child of a root emitter to which all events are propagated. Emitters can be nested (one can be a child of another), hence they internally create a tree hierarchy.

Every emitter instance has the following **properties**:

- `namespace` in which the emitter operates (eg: `agents.requirement`, `tool.open_meteo`, ...).
- `creator` class which the given emitter belongs to.
- `context` (dictionary which is attached to all events emitted via the given emitter).
- `trace` metadata (such as current `id`, `run_id` and `parent_id`)

It also gives you the following **methods** for managing listeners:

- `on` for registering a new event listener
  - The method takes `matcher` (event name, callback, regex), `callback` (sync/async function), and `options` (priority, etc.)
  - The method can be used as a decorator or as a standalone function
- `off` for deregistering an event listener
- `pipe` for propagating all captured events to another emitter
- `child` for creating a child emitter

<Note>The event's `path` attribute is created by concatenating `namespace` with an event name (eg: `backend.chat.ollama.start`).</Note>

**Example**

The following example depicts a minimal application that does the following:

1. defines a data object for the `fetch_data` event,
2. creates an emitter from the root one,
3. registers a callback listening to the `fetch_data` event, which modifies its content,
4. fires the `fetch_data` event,
5. logs the modified event's data.

```py Python [expandable]
from pydantic import BaseModel
from beeai_framework.emitter import EventMeta, Emitter

# Define a data model for the event
class FetchDataEvent(BaseModel):
      url: str

# Create an emitter and pass events
emitter = Emitter.root().child(
    namespace=["app"],
    events={"fetch_data": FetchDataEvent} # optional
)

# Listen to an event
@emitter.on("fetch_data")
def handle_event(data: FetchDataEvent, meta: EventMeta) -> None:
    print(f"Retrieved event {meta}")
    data.url = "https://mywebsite.com"

# Create and emit the event
data = FetchDataEvent(url="https://example.com")
await emitter.emit("fetch_data", data)
print(data.url) # "https://mywebsite.com"

# Deregister a callback (optional)
emitter.off(callback=handle_event)
```

<Note>The `emitter.on` can be used directly and not just as a decorator. Example: `emitter.on("fetch_data", callback)`.</Note>

<Tip>If you name your function as either `handle_{event_name}` or `on_{event_name}`, then you don't need to provide the event name as a parameter, as it gets inferred automatically.</Tip>

### Run (Context)

The `Run` class **acts as a wrapper of the target implementation** with its **own lifecycle** (an emitter with a set of events) and **context** (data that gets propagated to all events).

The `RunContext` class is a container that stores information about the current execution context.

These abstractions allow you to:

- modify `input` to the given target (listen to a `start` event and modify the content of the `input` property),
- modify `output` from the given target (listen to a `success` event and modify the content of the `output` property),
- stop the run early (listen to a `start` event and set the `output` property to a non-`None` value),
- propagate `context` (dictionary) to any component of your system,
- cancel the execution in an arbitrary place,
- gain observability into runs via structured events (for logging, tracing, and debugging).

**The Run and Run Context gets created** when a `run` method gets called on a framework class that can be executed (eg, `ChatModel`, `Agent`, ...).

The run object has the following **methods**:

- The `on` method allows registering a callback to its emitter.
- The `middleware` for registering middleware (a function that takes `RunContext` as a first parameter or a class with a `bind` method that takes the `RunContext` as a first parameter).
- The `context` allows data to be set for a given execution. That data will then be propagated as metadata in every event that gets emitted.

The target implementation (handler) becomes part of the shared context (`RunContext`), which internally forms a hierarchical tree structure that shares the same context.


In simpler terms, when you call one runnable (e.g., `ChatModel`) from within another runnable (e.g., `Agent`), the inner call (`ChatModel`) is attached to the context of the outer one (`Agent`).

<Tip>The current execution context can be retrieved anytime by calling `RunContext.get()`.</Tip>


### Runnable

The `Runnable[R]` class unifies common objects that can be executed and observed. It is an abstract class with the following traits:

- It has an abstract `run` method that executes the class and returns a `Run[R]` (`R` is bound to the `RunnableOutput`).
- It has an abstract `emitter` getter.
- It has a `middlewares` getter that lists the existing middlewares.

**Invoking a Runnable**

Every runnable takes a list of messages as its first (positional) parameter, followed by the following optional keyword arguments (`RunnableOptions`):

- `signal` (an instance of `AbortSignal`) — allows aborting the execution.
- `context` (a dictionary) — used to propagate additional data.

You can also pass extra arguments that may or may not be processed by the given handler.

The `RunnableOutput` has the following properties:

- `output`: a list of messages (can be empty)
- `context`: a dictionary that can store additional data
- `last_message` (getter): returns the last message if it exists, or creates an empty `AssistantMessage` otherwise

**Creating a custom Runnable**

{/* <!-- embedme python/examples/simple_runnable.py --> */}
```py Python [expandable]
import asyncio
from functools import cached_property
from typing import Unpack

from beeai_framework.backend import AnyMessage, AssistantMessage, UserMessage
from beeai_framework.context import RunContext
from beeai_framework.emitter import Emitter
from beeai_framework.runnable import Runnable, RunnableOptions, RunnableOutput, runnable_entry


class GreetingRunnable(Runnable[RunnableOutput]):
    @runnable_entry
    async def run(self, input: list[AnyMessage], /, **kwargs: Unpack[RunnableOptions]) -> RunnableOutput:
        # retrieves the current run contex
        run = RunContext.get()

        response = f"Hello, {run.context.get('name', 'stranger')}!"

        # sends an emit so that someone can react to it (optional)
        await run.emitter.emit("before_send", response)

        return RunnableOutput(output=[AssistantMessage(response)])

    @cached_property
    def emitter(self) -> Emitter:
        return Emitter.root().child(namespace=["echo"])


async def main() -> None:
    echo = GreetingRunnable()
    response = await echo.run([UserMessage("Hello!")]).context({"name": "Alex"})
    print(response.last_message.text)


if __name__ == "__main__":
    asyncio.run(main())

```

## Event Handling

Building robust agents requires precise control over the execution lifecycle. You need the ability to not only observe your agent's behavior but also intercept and modify it at specific points.

The following sections covers the mechanics of the BeeAI Framework **event system** and will enable you to manage:
- **Scopes**: Deciding whether to listen globally, per instance, or for a single run.
- ***Config**: Controlling listener priority, persistence, and blocking behavior.
- **Lifecycle**: Undestanding the exact sequence events that occur during execution.
- **Debugging**: Inspecting raw event streams to see exactly what your agent is doing.
- **Piping**: Linking emitters together via piping to create unified event streams.

### Scopes

Events can be observed at three different levels.

**1. Global Level**

Every emitter provided by the out-of-the-box modules is a child of the root emitter. This means you can listen to **all events** directly from the root emitter.

```py Python [expandable]
from beeai_framework.emitter import Emitter, EventMeta
from typing import Any

def log_all_events(data: Any, meta: EventMeta):
  print(f"Received event ({meta.id}) with name {meta.name} and path {meta.path}. \
        The event was created by {type(meta.creator)} and is a type of {type(data)}.")

root = Emitter.root()
root.on("*.*", log_all_events)
```

<Note>Listeners that are bound "closer" to the source are executed earlier. For those that reside at the same level, the order can be altered by setting a `priority` value which is part of the `EmitterOptions` class. A higher `priority` value means the listener will be executed earlier. The default priority is `0`.</Note>

**2. Instance Level**

You can also listen to events emitted by a **specific instance** of a class.

```py Python [expandable]
from beeai_framework.backend.chat import ChatModel, ChatModelStartEvent
from beeai_framework.backend.message import UserMessage
from beeai_framework.emitter import EventMeta
from typing import Any

def change_model_temperature(data: ChatModelStartEvent, meta: EventMeta):
    print(f"The chat model triggered a start event. Changing a temperature.")
    data.input.temperature = 0.5

model = ChatModel.from_name("ollama:granite3.3")
model.emitter.on("start", change_model_temperature)

await model.run([UserMessage("Hello!")])
```

This registers a callback to the class's emitter so that all events in a given class will be captured.

**3. Run (Invocation) Level**

Sometimes you may want to listen to events emitted by **a single run** of a class.

```py Python [expandable]
from beeai_framework.backend.chat import ChatModel, ChatModelStartEvent
from beeai_framework.backend.message import UserMessage
from beeai_framework.emitter import EventMeta
from typing import Any

model = ChatModel.from_name("ollama:granite4")

def change_model_temperature(data: ChatModelStartEvent, meta: EventMeta):
    print(f"The chat model triggered a start event. Changing a temperature.")
    data.input.temperature = 0.5

await model.run([UserMessage("Hello!")])
            .on("start", change_model_temperature)
```

Here, the callback is registered on the **run instance** (created by the `run` method).
The run’s emitter is a child of the class emitter, allowing you to modify behavior for a single invocation without affecting others.

### Config

When working with multiple callbacks, you may need to control execution order, or ensure that some run exclusively.
You can do this using the optional `options` argument of type `EmitterOptions`.

**Example**

```py Python [expandable]
from beeai_framework.emitter import EmitterOptions, EventMeta
from beeai_framework.backend.chat import ChatModel, ChatModelNewTokenEvent

# Creates a chat model instance
model = ChatModel.from_name("ollama:granite3.3", stream=True)

# Defines a callback that will be executed when a new token is emitted by the model
def cb(data: ChatModelNewTokenEvent, meta: EventMeta):
    print(f"[{meta.id}]: Received chunk", data.value.get_text_content())

# Creates a reference to the emitter
e = model.emitter

 # will be executed only once and then gets unregistered (default is False)
e.on("new_token", cb, EmitterOptions(once=True))

 # will not be deleted (default is False)
e.on("new_token", cb, EmitterOptions(persistent=True))

 # will be executed before those with a lower priority (default is 0), priority can also be negative
e.on("new_token", cb, EmitterOptions(priority=1))

# runs before every other callback with the same priority (default is False)
e.on("new_token", cb, EmitterOptions(is_blocking=True))

# match events that are emitted by the same type of class but executed within a target (e.g., calling agent.run(...) inside another agent.run(...))
e.on("new_token", cb, EmitterOptions(match_nested=True))
```

**Nested events**

Based on the value of the `matcher` parameter (the one that is used to match the event), the framework
decides whether to include/exclude nested events (events created from children emitters or from piping).

The default value of the `match_nested` depends on the `matcher` value. Note that the value can be set directly as shown in the example above.

| Matcher Type                       | Default `match_nested` |
|------------------------------------|------------------------|
| String without `.` (event name)    | `False`                |
| String with `.` (event path)       | `True`                 |
| `"*"` (match all top-level events) | `False`                |
| `"*.*"` (match all events)         | `True`                 |
| Regex                              | `True`                 |
| Function                           | `False`                |

<Note>If two events have the same priority, they are executed in the order they were added.</Note>


### Lifecycle

When a framework component is executed, it creates a run context, which wraps the target handler and allows you to modify its input and output (Learn more in the [Run (Context)](#run-context) section).

Once a `Run` instance is executed (i.e., **awaited**), its lifecycle proceeds through the following steps:

1. The `start` event is emitted.
2. The target implementation is executed.
3. Depending on the outcome, either a `success` or `error` event is emitted.
4. Finally, the `finish` event is emitted.

The appropriate events are depicted in the following table:

| Event     | Data Type                | Description                                                                                                                                                                                                  |
| :-------- |:-------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `start`   | `RunContextStartEvent`   | Triggered when the run starts. |
| `success` | `RunContextSuccessEvent` | Triggered when the run succeeds. |
| `error`   | `FrameworkError`         | Triggered when an error occurs. |
| `finish`  | `RunContextFinishEvent`  | Triggered when the run finishes. |

Below is an example showing how to listen to these events:

```py Python [expandable]
import asyncio

from beeai_framework.backend import AnyMessage, AssistantMessage, ChatModelOutput
from beeai_framework.backend.chat import ChatModel, ChatModelNewTokenEvent
from beeai_framework.backend.message import UserMessage
from beeai_framework.context import RunContextStartEvent, RunContextFinishEvent, RunContext
from beeai_framework.emitter import EventMeta
from beeai_framework.emitter.utils import create_internal_event_matcher

model = ChatModel.from_name("ollama:granite3.3")

def change_temperature(data: RunContextStartEvent, meta: EventMeta) -> None:
    """Modify the input of the model.run()"""

    print("debug: changing temperature to 0.5.\n")
    data.input["temperature"] = 0.5 # data.input contains all positional/keyword arguments of the called function


def premature_stop(data: RunContextStartEvent, meta: EventMeta) -> None:
    """Checks whether the input contains malicious text.
    If so, we prevent the ChatModel from executing and immediately return a custom response.
    """

    print("debug: Checking for a malicious input")
    messages: list[AnyMessage] = data.input["input"]  # first parameter
    for message in messages:
        if "bomb" in message.text:
            print("debug: Premature stop detected.")
            data.output = ChatModelOutput(output=[AssistantMessage("Cannot answer that.")])
            break

def validate_new_token(data: ChatModelNewTokenEvent, meta: EventMeta) -> None:
    """Check if the stream contains a malicious word. If so, we abort the run."""

    run = RunContext.get()

    if "fuse" in data.value.get_text_content():
        print(f"Aborting run for user with ID {run.context.get('user_id')}")
        run._controller.abort("Policy violation. Aborting the run.")



response = await (
    model.run([UserMessage("How to make a bomb?")])
    .on("new_token", validate_new_token)
    .on(create_internal_event_matcher("start", model), change_temperature)
    .on(create_internal_event_matcher("start", model), premature_stop)
    .context({"user_id": 123})
)
print("Agent:", response.get_text_content())
```

<Note>In this example, `create_internal_event_matcher` ensures we correctly match the event.</Note>

<Tip>You can retrieve the current run context at any time within a callback using `RunContext.get()`.</Tip>

### Debugging

While the [Global Trajectory](#global-trajectory) middleware is excellent for visualizing the structural hierarchy of a run, sometimes you need to inspect the raw stream of events as they happen.

To do this quickly without setting up a full middleware class, you can register a wildcard listener (`*.*`) directly on your run. This captures every single event emitted during that specific execution.

```py Python [expandable]
agent = RequirementAgent("ollama:granite3.3", tools=[OpenMeteoTool()])
response = await agent
  .run("What's the current weather in Miami?")
  .on("*.*", lambda data, meta: print(meta.path, 'by', type(meta.creator)))
```

### Piping

In some cases, one might want to propagate all events from one emitter to another (for instance when creating a child emitter).

<CodeGroup>

{/* <!-- embedme python/examples/emitter/piping.py --> */}
```py Python [expandable]
import asyncio
import sys
import traceback

from beeai_framework.emitter import Emitter
from beeai_framework.errors import FrameworkError


async def main() -> None:
    first: Emitter = Emitter(namespace=["app"])

    first.on(
        "*.*",
        lambda data, event: print(
            f"'first' has retrieved the following event '{event.path}', isDirect: {event.source == first}"
        ),
    )

    second: Emitter = Emitter(namespace=["app", "llm"])

    second.on(
        "*.*",
        lambda data, event: print(
            f"'second' has retrieved the following event '{event.path}', isDirect: {event.source == second}"
        ),
    )

    # Propagate all events from the 'second' emitter to the 'first' emitter
    unpipe = second.pipe(first)

    await first.emit("a", {})
    await second.emit("b", {})

    print("Unpipe")
    unpipe()

    await first.emit("c", {})
    await second.emit("d", {})


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except FrameworkError as e:
        traceback.print_exc()
        sys.exit(e.explain())

```

{/* <!-- embedme typescript/examples/emitter/piping.ts --> */}
```ts TypeScript [expandable]
import { Emitter, EventMeta } from "beeai-framework/emitter/emitter";

const first = new Emitter({
  namespace: ["app"],
});

first.match("*.*", (data: unknown, event: EventMeta) => {
  console.log(
    `'first' has retrieved the following event ${event.path}, isDirect: ${event.source === first}`,
  );
});

const second = new Emitter({
  namespace: ["app", "llm"],
});
second.match("*.*", (data: unknown, event: EventMeta) => {
  console.log(
    `'second' has retrieved the following event '${event.path}', isDirect: ${event.source === second}`,
  );
});

// Propagate all events from the 'second' emitter to the 'first' emitter
const unpipe = second.pipe(first);

await first.emit("a", {});
await second.emit("b", {});

console.log("Unpipe");
unpipe();

await first.emit("c", {});
await second.emit("d", {});

```

</CodeGroup>


## Creating Custom Middleware

While you can register individual callbacks to handle specific events, this approach can become cluttered if you have complex logic. To make your event handling reusable and modular, the BeeAI framework allows you to group listeners into a class called **Middleware.**

### **When to use Middleware vs. Callbacks**

* **Use Callbacks (`.on` / `.match`)**: For simple, one-off logic, such as logging a specific event or debugging a single run.
* **Use Middleware**: When the logic is complex, multi-step  or needs to be reused across different parts of your application.

### **The Middleware Protocol**

A middleware component is defined by how it interacts with the RunContext. It can be structured in two ways:

1. **A Function**: A simple function that accepts `RunContext` as its first parameter.

2. **A Class**: A class that implements a bind method, which accepts `RunContext` as its first parameter.

The `RunContex` provides access to the emitter, the instance being run, and the shared memory for that specific execution.

### **Example: Intercepting and Overriding**

A common use case for middleware is intercepting a request before the target component executes to modify the input or provide a mock response.

The following example demonstrates a middleware that intercepts the `start` event. By setting the `output` property on the event data, the middleware effectively "mocks" the result, preventing the actual ChatModel from running.

{/* <!-- embedme python/examples/middleware/override.py --> */}

```py Python [expandable]
import asyncio
from typing import Any

from beeai_framework.backend import AssistantMessage, ChatModel, ChatModelOutput, UserMessage
from beeai_framework.context import RunContext, RunContextStartEvent, RunMiddlewareProtocol
from beeai_framework.emitter import EmitterOptions, EventMeta
from beeai_framework.emitter.utils import create_internal_event_matcher


class OverrideResponseMiddleware(RunMiddlewareProtocol):
    """Middleware that sets the result value for a given runnable without executing it"""

    def __init__(self, result: Any) -> None:
        self._result = result

    def bind(self, ctx: RunContext) -> None:
        """Calls once the target is about to be run."""

        ctx.emitter.on(
            create_internal_event_matcher("start", ctx.instance),
            self._run,
            # ensures that this callback will be the first invoked
            EmitterOptions(is_blocking=True, priority=1),
        )

    async def _run(self, data: RunContextStartEvent, meta: EventMeta) -> None:
        """Set output property to the result which prevents an execution of the target handler."""

        data.output = self._result


async def main() -> None:
    middleware = OverrideResponseMiddleware(ChatModelOutput(output=[AssistantMessage("BeeAI is the best!")]))
    response = await ChatModel.from_name("ollama:granite4:micro").run([UserMessage("Hello!")]).middleware(middleware)
    print(response.get_text_content())  # "BeeAI is the best!"


if __name__ == "__main__":
    asyncio.run(main())
```

**Key Implementation Details:**

* `create_internal_event_matcher`: A helper used to ensure you are matching the specific internal event (like `start` / `success` / `error` / `finish`) for the correct component instance.
* `EmitterOptions`: Used here to set `priority=1` and `is_blocking=True`, ensuring this middleware executes early and takes precedence over other callbacks.
* `data.output`: Setting this property during a `start` event signals the framework to skip the underlying execution (e.g., the LLM call) and return this value immediately.

<Note>Ensure that your mock response matches the expected output type of the component you are intercepting. For example, if you override a `ChatModel`, the return type must be `ChatModelOutput`.</Note>

### **Registering Middleware**

Once defined, you can attach middleware to a component using the `.middleware()` method just before execution.

```py Python [expandable]
async def main() -> None:
  # 1. Prepare the mock response
  mock_output = ChatModelOutput(output=[AssistantMessage("BeeAI is the best!")])

  # 2. Initialize the middleware
  middleware = OverrideResponseMiddleware(mock_output)

  # 3. Attach middleware to the run
  llm = ChatModel.from_name("ollama:granite4")
  response = await llm.run([UserMessage("Hello!")]).middleware(middleware)

  print(response.get_text_content())  # Output: "BeeAI is the best!"

if __name__ == "__main__":
  asyncio.run(main())
```

Note that middleware is applied to the **Run** instance (the result of calling `.run()`), not the standalone emitter class itself. However, in some cases, middleware can be passed via the component's constructor if supported.

## Events glossary

The following sections list all events that can be observed for built-in components. Note that your tools/agents/etc. can emit additional events.

### Tools

The following events can be observed when calling `Tool.run(...)`.

| Event     | Data Type          | Description                                                              |
| :-------- | :----------------- | :----------------------------------------------------------------------- |
| `start`   | `ToolStartEvent`   | Triggered when a tool starts executing.                                  |
| `success` | `ToolSuccessEvent` | Triggered when a tool completes execution successfully.                  |
| `error`   | `ToolErrorEvent`   | Triggered when a tool encounters an error.                               |
| `retry`   | `ToolRetryEvent`   | Triggered when a tool operation is being retried.                        |
| `finish`  | `None`             | Triggered when tool execution finishes (regardless of success or error). |


### Chat Models

The following events can be observed when calling `ChatModel.run(...)`.

| Event        | Data Type                | Description                                                                |
| :----------- | :----------------------- | :------------------------------------------------------------------------- |
| `start`      | `ChatModelStartEvent`    | Triggered when model generation begins.                                    |
| `new_token`  | `ChatModelNewTokenEvent` | Triggered when a new token is generated during streaming. Streaming must be enabled. |
| `success`    | `ChatModelSuccessEvent`  | Triggered when the model generation completes successfully.                |
| `error`      | `ChatModelErrorEvent`    | Triggered when model generation encounters an error.                       |
| `finish`     | `None`                   | Triggered when model generation finishes (regardless of success or error). |

[Check out the in-code definition](https://github.com/i-am-bee/beeai-framework/blob/main/python/beeai_framework/backend/events.py)


### Requirement Agent


| Event          | Data Type                          | Description                                                                       |
|:---------------|:-----------------------------------|:----------------------------------------------------------------------------------|
| `start`        | `RequirementAgentStartEvent`       | Triggered when the agent begins execution.                                        |
| `success`      | `RequirementAgentSuccessEvent`     | Triggered when the agent successfully completes execution.                        |
| `final_answer` | `RequirementAgentFinalAnswerEvent` | Triggered with intermediate chunks of the final answer. |

[Check out the in-code definition](https://github.com/i-am-bee/beeai-framework/blob/main/python/beeai_framework/agents/experimental/events.py).



### ToolCalling Agent

The following events can be observed by calling `ToolCallingAgent.run(...)`.


| Event     | Data Type                      | Description                                                |
| :-------- | :----------------------------- | :--------------------------------------------------------- |
| `start`   | `ToolCallingAgentStartEvent`   | Triggered when the agent begins execution.                 |
| `success` | `ToolCallingAgentSuccessEvent` | Triggered when the agent successfully completes execution. |

[Check out the in-code definition](https://github.com/i-am-bee/beeai-framework/blob/main/python/beeai_framework/agents/tool_calling/events.py).

### ReAct Agent

The following events can be observed by calling `ReActAgent.run(...)`.


| Event                         | Data Type                | Description                                                |
| :---------------------------- | :----------------------- | :--------------------------------------------------------- |
| `start`                       | `ReActAgentStartEvent`   | Triggered when the agent begins execution.                 |
| `error`                       | `ReActAgentErrorEvent`   | Triggered when the agent encounters an error.              |
| `retry`                       | `ReActAgentRetryEvent`   | Triggered when the agent is retrying an operation.         |
| `success`                     | `ReActAgentSuccessEvent` | Triggered when the agent successfully completes execution. |
| `update` and `partial_update` | `ReActAgentUpdateEvent`  | Triggered when the agent updates its state.                |

[Check out the in-code definition](https://github.com/i-am-bee/beeai-framework/blob/main/python/beeai_framework/agents/react/events.py)


[Check out the in-code definition](https://github.com/i-am-bee/beeai-framework/blob/main/python/beeai_framework/tools/events.py).


### Workflow

The following events can be observed when calling `Workflow.run(...)`.

| Event     | Data Type              | Description                                            |
| :-------- | :--------------------- | :----------------------------------------------------- |
| `start`   | `WorkflowStartEvent`   | Triggered when a workflow step begins execution.       |
| `success` | `WorkflowSuccessEvent` | Triggered when a workflow step completes successfully. |
| `error`   | `WorkflowErrorEvent`   | Triggered when a workflow step encounters an error.    |

[Check out the in-code definition](https://github.com/i-am-bee/beeai-framework/blob/main/python/beeai_framework/workflows/events.py).


### LinePrefixParser

The following events are caught internally by the `LinePrefixParser`.

| Event            | Data Type                | Description                             |
| :--------------- | :----------------------- | :-------------------------------------- |
| `update`         | `LinePrefixParserUpdate` | Triggered when an update occurs.        |
| `partial_update` | `LinePrefixParserUpdate` | Triggered when a partial update occurs. |

### StreamToolCallMiddleware

The following events are caught internally by the `StreamToolCallMiddleware`.

| Event            | Data Type                | Description                             |
| :--------------- | :----------------------- | :-------------------------------------- |
| `update`         | `StreamToolCallMiddlewareUpdateEvent` | Triggered when an update occurs.        |

[Check out the in-code definition](https://github.com/i-am-bee/beeai-framework/blob/main/python/beeai_framework/middleware/stream_tool_call.py).

### GlobalTrajectoryMiddleware

The following events are handled internally by the `GlobalTrajectoryMiddleware`:

| Event     | Data Type                                | Description                                                                                 |
|:----------|:-----------------------------------------|:--------------------------------------------------------------------------------------------|
| `start`   | `GlobalTrajectoryMiddlewareStartEvent`   | Triggered when a target begins execution.                                                   |
| `success` | `GlobalTrajectoryMiddlewareSuccessEvent` | Triggered when a target completes successfully.                                             |
| `error`   | `GlobalTrajectoryMiddlewareErrorEvent`   | Triggered when an error occurs during target execution.                                     |
| `finish`  | `GlobalTrajectoryMiddlewareFinishEvent`  | Triggered after a target has finished execution, regardless of success or failure.          |

All events inherit from the `GlobalTrajectoryMiddlewareEvent` class.

```py Python [expandable]
class GlobalTrajectoryMiddlewareEvent(BaseModel):
    message: str
    level: TraceLevel
    origin: tuple[Any, EventMeta]
```

The first element of the `origin` attribute is the original event (e.g., `start` → `RunContextStartEvent`, etc.) that comes from the `RunContext`.

### RunContext

Special events that are emitted before the target's handler gets executed.
A run event contains `.run.` in its event's path and has `internal` set to true in the event's context object.

| Event     | Data Type                | Description                                                                                                                                                                                                  |
| :-------- |:-------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `start`   | `RunContextStartEvent`   | Triggered when the run starts. Has `input` (positional/keyword argument with which the function was run) and `output` property. Set the `output` property to prevent the execution of the target handler. |
| `success` | `RunContextSuccessEvent` | Triggered when the run succeeds. |
| `error`   | `FrameworkError`         | Triggered when an error occurs. |
| `finish`  | `RunContextFinishEvent`  | Triggered when the run finishes. |

[Check out the in-code definition](https://github.com/i-am-bee/beeai-framework/blob/main/python/beeai_framework/context.py#L260-L273).

<Tip>Instead of a manual matching, use `create_internal_event_matcher` helper function.</Tip>

## Examples

<CardGroup cols={2}>
  <Card title="Python" icon="python" href="https://github.com/i-am-bee/beeai-framework/tree/main/python/examples/middleware">
    Explore reference middleware implementations in Python.
  </Card>
  <Card title="TypeScript" icon="js" href="https://github.com/i-am-bee/beeai-framework/tree/main/typescript/examples/middleware">
    Explore reference middleware implementations in TypeScript.
  </Card>
</CardGroup>
